-
Notifications
You must be signed in to change notification settings - Fork 29k
[SPARK-5479] [yarn] Handle --py-files correctly in YARN. #6360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
The bug description is a little misleading: the actual issue is that .py files are not handled correctly when distributed by YARN. They're added to "spark.submit.pyFiles", which, when processed by context.py, explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS), and that does not include .py files. On top of that, archives were not handled at all! They made it to the driver's python path, but never made it to executors, since the mechanism used to propagate their location (spark.submit.pyFiles) only works on the driver side. So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH correctly for both driver and executors. Individual .py files are placed in a subdirectory of the container's local dir in the cluster, which is then added to the python path. Archives are added directly. The change, as a side effect, ends up solving the symptom described in the bug. The issue was not that the files were not being distributed, but that they were never made visible to the python application running under Spark. Also included is a proper unit test for running python on YARN, which broke in several different ways with the previous code. A short walk around of the changes: - SparkSubmit does not try to be smart about how YARN handles python files anymore. It just passes down the configs to the YARN client code. - The YARN client distributes python files and archives differently, placing the files in a subdirectory. - The YARN client now sets PYTHONPATH for the processes it launches; to properly handle different locations, it uses YARN's support for embedding env variables, so to avoid YARN expanding those at the wrong time, SparkConf is now propagate to the AM using a conf file instead of command line options. - Because the Client initialization code is a maze of implicit dependencies, some code needed to be moved around to make sure all needed state was available when the code ran. - The pyspark tests in YarnClusterSuite now actually distribute and try to use both a python file and an archive containing a different python module. Also added a yarn-client tests for completeness. - I cleaned up some of the code around distributing files to YARN, to avoid adding more copied & pasted code to handle the new files being distributed.
|
Tests run:
|
|
Test build #33352 has finished for PR 6360 at commit
|
|
Test build #33363 has finished for PR 6360 at commit
|
|
Jenkins, retest this please. |
|
Test build #33381 has finished for PR 6360 at commit
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i think primaryPyFile is only used in applicationMaster, so it is only distributed to appMaster.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't you need the file around when you run code in the executors? How is the code shipped to executors otherwise?
I can try it out, but it feels weird to have used code only in the AM.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, strangely enough it seems things work without distributing the main file. Wonder if that's safe in all cases or just a side-effect of the really simple test code, though.
|
I'm not an expert on this code by any means. The problem description and solution sound reasonable and match up with the change. I skimmed the change and didn't see anything I'd flag. Tests pass. I'd love to get more Python folks to look at it, like @davies (and I see @lianhuiwang is already looking too), but this appears to be a good cleanup and problem solving change for 1.5 / |
|
Test build #33481 has finished for PR 6360 at commit
|
|
Ping @tgravescs @andrewor14 |
|
@vanzin sorry I'm going to be on vacation for the next week |
Conflicts: core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala
|
Test build #33921 has finished for PR 6360 at commit
|
|
Test build #33995 timed out for PR 6360 at commit |
|
jenkins, retest this please. |
|
Test build #34016 has finished for PR 6360 at commit
|
|
@tgravescs shall we give you a few more days to take a look? Tests pass and I tend to trust Mr Vanzin here. @davies do you have any input from a Python perspective? |
|
Hey I'll take a look at this later today, sorry for slipping. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I actually find the prior organization of this code a little easier to follow. It read
if (args.isPython && clusterManager == YARN) { /* do something */ }
if (args.isPython && deployMode == CLIENT) { /* do something else */ }
So if I only cared about python but not YARN I can just skip the first block and read the second block. Now it's one giant python block. Not a big deal, but if merging these isn't strictly necessary I would prefer the old structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(also it'll be easier to review and back port stuff later)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok, I'll revert to the old style. I actually would like to move the YARN code to Client.scala but didn't want to pile that up onto this change.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah that might be a good change
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up doing that in this change, since I needed to modify SparkSubmit anyway because the two conditions needed to run in the opposite order... well, long story short, the YARN code is now in Client.scala, where it should have always been.
|
Test build #34286 has finished for PR 6360 at commit
|
|
Hmm, moving the archive-finding code to Client.scala caused the failure... looking. |
Need a different way to tell Client.scala to distribute pyspark libs.
|
Test build #34316 has finished for PR 6360 at commit
|
|
@andrewor14 how's this looking to you? I think the review feedback was incorporated. Do we need @tgravescs to have a look? |
|
I'll take a look again later today, but last time I checked this is already in a good state. |
|
no need to wait for me. I had taken a quick look and think the overall approach looks good. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can't this just be val mainArgs = userArgs.toArray? Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comes from all the way back when there was yarn/alpha and yarn/stable, but yeah, it doesn't look necessary.
|
Ok, I tested this on a Hadoop 2.4 cluster and verified that both client and cluster modes work with |
|
One higher level thing I'd like to mention is that the YARN code is getting quite difficult to review. We cleaned up a lot of the duplicate code and added documentation everywhere at one point, and that helped a lot. However, it is still difficult to follow the code or ensure its correctness because of a few things:
Rather than suggesting that we fix all these at once, I'm just jotting down my thoughts so we don't forget to fix these down the road. I'll add these thoughts to the umbrella YARN clean up JIRA. |
Yeah, that comes from a long time. It really needs some cleaning up. It's not just the size of methods, but the implicit dependencies they have on one another (method A sets some instance field that is needed by method B, so they must be executed in that order or things break). That's the trickiest thing when messing with this code (the size is not that big of an issue IMO). |
|
OK, I've updated SPARK-3492. Yes, all implicit dependencies should be documented or even removed. |
|
Test build #34629 has finished for PR 6360 at commit
|
|
LGTM merging into master thanks @vanzin. |
The bug description is a little misleading: the actual issue is that .py files are not handled correctly when distributed by YARN. They're added to "spark.submit.pyFiles", which, when processed by context.py, explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS), and that does not include .py files. On top of that, archives were not handled at all! They made it to the driver's python path, but never made it to executors, since the mechanism used to propagate their location (spark.submit.pyFiles) only works on the driver side. So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH correctly for both driver and executors. Individual .py files are placed in a subdirectory of the container's local dir in the cluster, which is then added to the python path. Archives are added directly. The change, as a side effect, ends up solving the symptom described in the bug. The issue was not that the files were not being distributed, but that they were never made visible to the python application running under Spark. Also included is a proper unit test for running python on YARN, which broke in several different ways with the previous code. A short walk around of the changes: - SparkSubmit does not try to be smart about how YARN handles python files anymore. It just passes down the configs to the YARN client code. - The YARN client distributes python files and archives differently, placing the files in a subdirectory. - The YARN client now sets PYTHONPATH for the processes it launches; to properly handle different locations, it uses YARN's support for embedding env variables, so to avoid YARN expanding those at the wrong time, SparkConf is now propagated to the AM using a conf file instead of command line options. - Because the Client initialization code is a maze of implicit dependencies, some code needed to be moved around to make sure all needed state was available when the code ran. - The pyspark tests in YarnClusterSuite now actually distribute and try to use both a python file and an archive containing a different python module. Also added a yarn-client tests for completeness. - I cleaned up some of the code around distributing files to YARN, to avoid adding more copied & pasted code to handle the new files being distributed. Author: Marcelo Vanzin <[email protected]> Closes apache#6360 from vanzin/SPARK-5479 and squashes the following commits: bcaf7e6 [Marcelo Vanzin] Feedback. c47501f [Marcelo Vanzin] Fix yarn-client mode. 46b1d0c [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 c743778 [Marcelo Vanzin] Only pyspark cares about python archives. c8e5a82 [Marcelo Vanzin] Actually run pyspark in client mode. 705571d [Marcelo Vanzin] Move some code to the YARN module. 1dd4d0c [Marcelo Vanzin] Review feedback. 71ee736 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 220358b [Marcelo Vanzin] Scalastyle. cdbb990 [Marcelo Vanzin] Merge branch 'master' into SPARK-5479 7fe3cd4 [Marcelo Vanzin] No need to distribute primary file to executors. 09045f1 [Marcelo Vanzin] Style. 943cbf4 [Marcelo Vanzin] [SPARK-5479] [yarn] Handle --py-files correctly in YARN.
|
@vanzin I am reading the yarn related code specially on org.apache.spark.deploy.yarn.Client.scala Another question is that if I specify spark.yarn.jar as a hdfs location, the yarn client will still copy it to staging directory, I don't know why we do this. Would just use the hdfs file as LocalResource without copying much easier ? |
|
@zjffdu please don't comment on closed PRs, especially ancient ones; that's super confusing. Use the dev@ list for that. To answer your questions: the "local:" scheme is defined in Spark's documentation. Defining a "hdfs" location for |
|
Thanks @vanzin, my fault, I specify the hdfs location using hostname, while it is ip address in core-site.xml (anyway, maybe we can improve it here) |
…n Yarn client mode
## What changes were proposed in this pull request?
### Problem
When we run _PySpark shell with Yarn client mode_, specified `--py-files` are not recognised in _driver side_.
Here are the steps I took to check:
```bash
$ cat /home/spark/tmp.py
def testtest():
return 1
```
```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
```
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect() # executor side
[1]
>>> test() # driver side
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
### How did it happen?
Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode with PySpark shell specifically,
1. It first runs Python shell via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158 as pointed out by tgravescs in the JIRA.
2. this triggers shell.py and submit another application to launch a py4j gateway:
https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60
3. it runs a Py4J gateway:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425
4. it copies (or downloads) --py-files into local temp directory:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376
and then these files are set up to `spark.submit.pyFiles`
5. Py4J JVM is launched and then the Python paths are set via:
https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216
However, these are not actually set because those files were copied into a tmp directory in 4. whereas this code path looks for `SparkFiles.getRootDirectory` where the files are stored only when `SparkContext.addFile()` is called.
In other cluster mode, `spark.files` are set via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555
and those files are explicitly added via:
https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395
So we are fine in other modes.
In case of Yarn client and cluster with _submit_, these are manually being handled. In particular apache#6360 added most of the logics. In this case, the Python path looks manually set via, for example, `deploy.PythonRunner`. We don't use `spark.files` here.
### How does the PR fix the problem?
I tried to make an isolated approach as possible as I can: simply copy py file or zip files into `SparkFiles.getRootDirectory()` in driver side if not existing. Another possible way is to set `spark.files` but it does unnecessary stuff together and sounds a bit invasive.
**Before**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
**After**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1
```
## How was this patch tested?
I manually tested in standalone and yarn cluster with PySpark shell. .zip and .py files were also tested with the similar steps above. It's difficult to add a test.
Author: hyukjinkwon <[email protected]>
Closes apache#21267 from HyukjinKwon/SPARK-21945.
…n Yarn client mode
When we run _PySpark shell with Yarn client mode_, specified `--py-files` are not recognised in _driver side_.
Here are the steps I took to check:
```bash
$ cat /home/spark/tmp.py
def testtest():
return 1
```
```bash
$ ./bin/pyspark --master yarn --deploy-mode client --py-files /home/spark/tmp.py
```
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect() # executor side
[1]
>>> test() # driver side
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
Unlike Yarn cluster and client mode with Spark submit, when Yarn client mode with PySpark shell specifically,
1. It first runs Python shell via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/launcher/src/main/java/org/apache/spark/launcher/SparkSubmitCommandBuilder.java#L158 as pointed out by tgravescs in the JIRA.
2. this triggers shell.py and submit another application to launch a py4j gateway:
https://github.com/apache/spark/blob/209b9361ac8a4410ff797cff1115e1888e2f7e66/python/pyspark/java_gateway.py#L45-L60
3. it runs a Py4J gateway:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L425
4. it copies (or downloads) --py-files into local temp directory:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L365-L376
and then these files are set up to `spark.submit.pyFiles`
5. Py4J JVM is launched and then the Python paths are set via:
https://github.com/apache/spark/blob/7013eea11cb32b1e0038dc751c485da5c94a484b/python/pyspark/context.py#L209-L216
However, these are not actually set because those files were copied into a tmp directory in 4. whereas this code path looks for `SparkFiles.getRootDirectory` where the files are stored only when `SparkContext.addFile()` is called.
In other cluster mode, `spark.files` are set via:
https://github.com/apache/spark/blob/3cb82047f2f51af553df09b9323796af507d36f8/core/src/main/scala/org/apache/spark/deploy/SparkSubmit.scala#L554-L555
and those files are explicitly added via:
https://github.com/apache/spark/blob/ecb8b383af1cf1b67f3111c148229e00c9c17c40/core/src/main/scala/org/apache/spark/SparkContext.scala#L395
So we are fine in other modes.
In case of Yarn client and cluster with _submit_, these are manually being handled. In particular #6360 added most of the logics. In this case, the Python path looks manually set via, for example, `deploy.PythonRunner`. We don't use `spark.files` here.
I tried to make an isolated approach as possible as I can: simply copy py file or zip files into `SparkFiles.getRootDirectory()` in driver side if not existing. Another possible way is to set `spark.files` but it does unnecessary stuff together and sounds a bit invasive.
**Before**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 2, in test
ImportError: No module named tmp
```
**After**
```python
>>> def test():
... import tmp
... return tmp.testtest()
...
>>> spark.range(1).rdd.map(lambda _: test()).collect()
[1]
>>> test()
1
```
I manually tested in standalone and yarn cluster with PySpark shell. .zip and .py files were also tested with the similar steps above. It's difficult to add a test.
Author: hyukjinkwon <[email protected]>
Closes #21267 from HyukjinKwon/SPARK-21945.
(cherry picked from commit 9a641e7)
Signed-off-by: Marcelo Vanzin <[email protected]>
The bug description is a little misleading: the actual issue is that
.py files are not handled correctly when distributed by YARN. They're
added to "spark.submit.pyFiles", which, when processed by context.py,
explicitly whitelists certain extensions (see PACKAGE_EXTENSIONS),
and that does not include .py files.
On top of that, archives were not handled at all! They made it to the
driver's python path, but never made it to executors, since the mechanism
used to propagate their location (spark.submit.pyFiles) only works on
the driver side.
So, instead, ignore "spark.submit.pyFiles" and just build PYTHONPATH
correctly for both driver and executors. Individual .py files are
placed in a subdirectory of the container's local dir in the cluster,
which is then added to the python path. Archives are added directly.
The change, as a side effect, ends up solving the symptom described
in the bug. The issue was not that the files were not being distributed,
but that they were never made visible to the python application
running under Spark.
Also included is a proper unit test for running python on YARN, which
broke in several different ways with the previous code.
A short walk around of the changes:
files anymore. It just passes down the configs to the YARN client
code.
placing the files in a subdirectory.
to properly handle different locations, it uses YARN's support for
embedding env variables, so to avoid YARN expanding those at the
wrong time, SparkConf is now propagated to the AM using a conf file
instead of command line options.
dependencies, some code needed to be moved around to make sure
all needed state was available when the code ran.
to use both a python file and an archive containing a different python
module. Also added a yarn-client tests for completeness.
avoid adding more copied & pasted code to handle the new files being
distributed.